package main

import (
	"log"
	"net"
	"sync"
	"time"

	"github.com/golang/protobuf/proto"
)

type Session struct {
	Gid    uint32
	Uid    uint32
	Key    uint64
	Con    net.Conn
	Buffer [2]*MessageArray
	Codec  *Codec
}

func NewSession(c net.Conn, rbs, wbs int) *Session {
	return &Session{
		Con:   c,
		Codec: NewCodec(c, rbs, wbs),
		Buffer: [2]*MessageArray{
			&MessageArray{},
			&MessageArray{},
		},
	}
}

type Forwarder struct {
	mtx       sync.RWMutex
	seq       uint32
	frq       int64
	usr       sync.Map // Session Map
	cnt       int
	left      int
	idle      bool
	idleStart time.Time
	status    int32
	ready     chan uint64
	quit      chan *Session
	pool      *sync.Pool
}

func NewForwarder(frq int64, cnt int) *Forwarder {
	return &Forwarder{
		frq:   frq,
		cnt:   cnt,
		ready: make(chan uint64, GC_SignalCache),
		quit:  make(chan *Session, GC_SignalCache),
	}
}

func (this *Forwarder) AddSession(ses *Session) {
	ses.Key = MakeUserKey(ses.Gid, ses.Uid)
	if v, ok := this.usr.Load(ses.Key); ok {
		// Remove Duplicate
		this.usr.Delete(ses.Key)
		oses := v.(*Session)
		if oses != nil {
			log.Printf("kick duplicate connection|Gid:%d|Uid:%d", ses.Gid, ses.Uid)
			oses.Con.Close()
		}
	}
	this.usr.Store(ses.Key, ses)
	go this.handle(ses)
}

func (this *Forwarder) handle(ses *Session) {
	for {
		frame, err := ses.Codec.Read()
		if err != nil {
			log.Printf("read error|err:%v\n", err)
			this.usr.Delete(ses.Key)
			return
		}
		switch frame.Proto {
		case CC_Forward:
			m := &Message{}
			err = proto.Unmarshal(frame.Content, m)
			m.Gid = ses.Gid
			m.Uid = ses.Uid
			if err != nil {
				log.Printf("unmarshal error|err:%v\n", err)
				this.usr.Delete(ses.Key)
				return
			}
			this.mtx.RLock()
			ses.Buffer[this.seq&0x1].Datas = append(ses.Buffer[this.seq&0x1].Datas, m)
			this.mtx.RUnlock()
			ses.Codec.PutFrame(frame)
		case CC_Ready:
			log.Printf("Gid:%d|Uid:%d|Ready", ses.Gid, ses.Uid)
			this.ready <- ses.Key
		case CC_End:
			log.Printf("Gid:%d|Uid:%d|Quit", ses.Gid, ses.Uid)
			this.quit <- ses
			return
		}
		ses.Codec.PutFrame(frame)
	}
}

func (this *Forwarder) Ready() <-chan struct{} {
	ready := make(chan struct{})
	go func() {
		m := make(map[uint64]bool)
		c := this.cnt
		for i := 0; i < c; {
			select {
			case k := <-this.ready:
				if _, ok := m[k]; !ok {
					m[k] = true
					i++
				}
			case <-time.After(GC_ReadyTimeout):
				ready <- struct{}{}
				return
			}
		}
		ready <- struct{}{}
	}()
	return ready
}

func (this *Forwarder) Quit() bool {
	if this.left == 0 {
		return true
	}
	if this.idle && time.Since(this.idleStart) > GC_IdleTimeout {
		return true
	}
	return false
}

func (this *Forwarder) Start() {
	// Wait User Ready
	select {
	case <-this.Ready():
	}

	// Begin Broadcast
	frameBegin := &Frame{Proto: CC_Begin}
	this.usr.Range(func(key interface{}, value interface{}) bool {
		ses := value.(*Session)
		err := ses.Codec.Write(frameBegin)
		if err != nil {
			panic(err)
		}
		return true
	})

	this.left = this.cnt
	this.idle = false
	this.idleStart = time.Time{}
	this.status = GC_StatusRunning

	ma := &MessageArray{}
	frameBroadcast := &Frame{Proto: CC_Broadcast}
	pbuffer := proto.NewBuffer(nil)
	ticker := time.NewTicker(time.Second / time.Duration(this.frq))

	for !this.Quit() {
		select {
		case <-ticker.C:
			bseq := this.seq & 0x1

			this.mtx.Lock()
			this.seq++
			this.mtx.Unlock()

			ma.Seq = this.seq
			ma.Datas = ma.Datas[:0]
			this.usr.Range(func(key interface{}, value interface{}) bool {
				ses := value.(*Session)
				if ses != nil {
					for _, m := range ses.Buffer[bseq].Datas {
						ma.Datas = append(ma.Datas, m)
					}
					ses.Buffer[bseq].Datas = ses.Buffer[bseq].Datas[:0]
				}
				return true
			})

			if len(ma.Datas) == 0 {
				this.idle = true
				this.idleStart = time.Now()
			}

			pbuffer.Reset()
			err := pbuffer.Marshal(ma)
			if err != nil {
				log.Println("marshal message failure|err:%v", err)
				continue
			}
			frameBroadcast.Content = pbuffer.Bytes()
			this.usr.Range(func(key interface{}, value interface{}) bool {
				ses := value.(*Session)
				if ses != nil {
					ses.Codec.Write(frameBroadcast)
				}
				return true
			})

		case ses := <-this.quit:
			this.usr.Delete(ses)
			ses.Con.Close()
			this.left--
		}
	}

	// Clean
	this.usr.Range(func(key interface{}, value interface{}) bool {
		ses := value.(*Session)
		if ses != nil {
			ses.Con.Close()
		}
		return true
	})

	log.Println("Forward Quit Success")
}
